Platform Explorer / Nuxeo Platform 2023.6

Contribution org.nuxeo.elasticsearch.core.bulk.contrib--streamProcessor

This contribution is part of XML component org.nuxeo.elasticsearch.core.bulk.contrib inside nuxeo-elasticsearch-core-2023.6.12.jar /OSGI-INF/bulk-contrib.xml

Extension Point

Extension point streamProcessor of component service.

Registration Order

19
The registration order represents the order in which this contribution was registered on its target extention point. This will impact the override/merge behaviour when it is implemented on the target service, and is useful for proper customization of existing contributions.
You can influence this order by adding "require" tags in the containing component declaration, to make sure it is resolved after another component (see "Resolution Order" on components).

Contributed Items

  • <streamProcessor class="org.nuxeo.elasticsearch.bulk.IndexAction" defaultConcurrency="2" defaultPartitions="4" enabled="true" name="indexAction">
          <policy continueOnFailure="false" delay="1s" maxDelay="60s" maxRetries="20" name="default"/>
          <!-- fetch content and build indexing requests -->
          <computation concurrency="4" name="bulk/index"/>
          <stream name="bulk/index" partitions="12"/>
          <!-- submit requests to elastic -->
          <computation concurrency="2" name="bulk/bulkIndex"/>
          <stream name="bulk/bulkIndex" partitions="8">
            <filter class="org.nuxeo.ecm.core.transientstore.computation.TransientStoreOverflowRecordFilter" name="overflow">
              <option name="storeName">default</option>
              <option name="prefix">index</option>
              <option name="thresholdSize">990000</option>
            </filter>
          </stream>
          <computation concurrency="1" name="bulk/indexCompletion"/>
          <!-- optimal size of the elasticsearch bulk request -->
          <option name="esBulkSizeBytes">5242880</option>
          <!-- max number of actions in the elasticsearch bulk request -->
          <option name="esBulkActions">1000</option>
          <!-- flush elasticsearch bulk request interval -->
          <option name="flushIntervalSeconds">5</option>
        </streamProcessor>

XML Source

<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
    <streamProcessor class="org.nuxeo.elasticsearch.bulk.IndexAction" defaultConcurrency="2" defaultPartitions="4" enabled="true" name="indexAction">
      <policy continueOnFailure="false" delay="1s" maxDelay="60s" maxRetries="20" name="default"/>
      <!-- fetch content and build indexing requests -->
      <computation concurrency="4" name="bulk/index"/>
      <stream name="bulk/index" partitions="12"/>
      <!-- submit requests to elastic -->
      <computation concurrency="2" name="bulk/bulkIndex"/>
      <stream name="bulk/bulkIndex" partitions="8">
        <filter class="org.nuxeo.ecm.core.transientstore.computation.TransientStoreOverflowRecordFilter" name="overflow">
          <option name="storeName">default</option>
          <option name="prefix">index</option>
          <option name="thresholdSize">990000</option>
        </filter>
      </stream>
      <computation concurrency="1" name="bulk/indexCompletion"/>
      <!-- optimal size of the elasticsearch bulk request -->
      <option name="esBulkSizeBytes">5242880</option>
      <!-- max number of actions in the elasticsearch bulk request -->
      <option name="esBulkActions">1000</option>
      <!-- flush elasticsearch bulk request interval -->
      <option name="flushIntervalSeconds">5</option>
    </streamProcessor>

  </extension>